{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Congestion Charging - Easy\n",
    "\n",
    "![rel](https://sqlzoo.net/w/images/f/f6/CongestionCharge.png)\n",
    "\n",
    "camera(**id**, perim)\n",
    "\n",
    "keeper(**id**, name, address)\n",
    "\n",
    "vehicle(**id**, keeper)\n",
    "\n",
    "image(**_camera_**, **whn**, reg)\n",
    "\n",
    "permit(**_reg_**, **sDate**, chargeType)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Loading spark-stubs, spark-hive\n",
      "Adding Hive conf dir /opt/hive/conf to classpath\n",
      "Creating SparkSession\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "SLF4J: No SLF4J providers were found.\n",
      "SLF4J: Defaulting to no-operation (NOP) logger implementation\n",
      "SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a target=\"_blank\" href=\"http://jupyter:4040\">Spark UI</a>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$                                  \n",
       "\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.log4j.{Level, Logger}\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.types._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.functions._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.expressions.Window\n",
       "\n",
       "\u001b[39m\n",
       "\u001b[36mspark\u001b[39m: \u001b[32mSparkSession\u001b[39m = org.apache.spark.sql.SparkSession@1a8bd03e"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import $ivy.`org.apache.spark::spark-sql:3.4.0`\n",
    "\n",
    "import org.apache.log4j.{Level, Logger}\n",
    "Logger.getLogger(\"org\").setLevel(Level.OFF)\n",
    "\n",
    "import org.apache.spark._\n",
    "import org.apache.spark.sql._\n",
    "import org.apache.spark.sql.types._\n",
    "import org.apache.spark.sql.functions._\n",
    "import org.apache.spark.sql.expressions.Window\n",
    "\n",
    "val spark = {\n",
    "    NotebookSparkSession.builder()\n",
    "    .progress(false)\n",
    "    .appName(\"app18-1\")\n",
    "    // .master(\"spark://192.168.31.31:7077\")\n",
    "    .master(\"local[*]\")\n",
    "    .config(\"spark.sql.warehouse.dir\", \n",
    "            \"hdfs://192.168.31.31:9000/user/hive/warehouse\") \n",
    "    .config(\"spark.cores.max\", \"4\") \n",
    "    .config(\"spark.executor.instances\", \"1\") \n",
    "    .config(\"spark.executor.cores\", \"2\") \n",
    "    .config(\"spark.executor.memory\", \"10g\") \n",
    "    .config(\"spark.shuffle.service.enabled\", \"false\") \n",
    "    .config(\"spark.dynamicAllocation.enabled\", \"false\") \n",
    "    .config(\"spark.sql.catalogImplementation\", \"hive\")\n",
    "    .config(\"spark.sql.repl.eagerEval.enabled\", \"true\")\n",
    "    .config(\"spark.driver.allowMultipleContexts\", \"true\")\n",
    "    .getOrCreate()\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36mspark.implicits._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36mspark.sqlContext.implicits._\n",
       "\u001b[39m\n",
       "defined \u001b[32mfunction\u001b[39m \u001b[36msc\u001b[39m\n",
       "\u001b[36mhiveCxt\u001b[39m: \u001b[32msql\u001b[39m.\u001b[32mhive\u001b[39m.\u001b[32mHiveContext\u001b[39m = org.apache.spark.sql.hive.HiveContext@48683778"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import spark.implicits._\n",
    "import spark.sqlContext.implicits._\n",
    "def sc = spark.sparkContext\n",
    "val hiveCxt = new org.apache.spark.sql.hive.HiveContext(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined \u001b[32mclass\u001b[39m \u001b[36mRichDF\u001b[39m"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "// Credit to Aivean\n",
    "implicit class RichDF(val ds:DataFrame) {\n",
    "    def showHTML(limit: Int = 50, truncate: Int = 100) = {\n",
    "        import xml.Utility.escape\n",
    "        val data = ds.take(limit)\n",
    "        val header = ds.schema.fieldNames.toSeq        \n",
    "        val rows: Seq[Seq[String]] = data.map { row =>\n",
    "          row.toSeq.map {cell =>\n",
    "            val str = cell match {\n",
    "              case null => \"null\"\n",
    "              case binary: Array[Byte] => binary.map(\"%02X\".format(_)).mkString(\"[\", \" \", \"]\")\n",
    "              case array: Array[_] => array.mkString(\"[\", \", \", \"]\")\n",
    "              case seq: Seq[_] => seq.mkString(\"[\", \", \", \"]\")\n",
    "              case _ => cell.toString\n",
    "            }\n",
    "            if (truncate > 0 && str.length > truncate) {\n",
    "              // do not show ellipses for strings shorter than 4 characters.\n",
    "              if (truncate < 4) str.substring(0, truncate)\n",
    "              else str.substring(0, truncate - 3) + \"...\"\n",
    "            } else {\n",
    "              str\n",
    "            }\n",
    "          }: Seq[String]\n",
    "        }\n",
    "    publish.html(s\"\"\" <table>\n",
    "                <tr>\n",
    "                 ${header.map(h => s\"<th>${escape(h)}</th>\").mkString}\n",
    "                </tr>\n",
    "                ${rows.map {row =>\n",
    "                  s\"<tr>${row.map{c => s\"<td>${escape(c)}</td>\" }.mkString}</tr>\"\n",
    "                }.mkString}\n",
    "            </table>\n",
    "        \"\"\")\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[36mcamera\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: int, perim: string]\n",
       "\u001b[36mkeeper\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: int, name: string ... 1 more field]\n",
       "\u001b[36mvehicle\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: string, keeper: int]\n",
       "\u001b[36mimage\u001b[39m: \u001b[32mDataFrame\u001b[39m = [camera: int, whn: string ... 1 more field]\n",
       "\u001b[36mpermit\u001b[39m: \u001b[32mDataFrame\u001b[39m = [reg: string, sdate: string ... 1 more field]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val camera = hiveCxt.table(\"sqlzoo.camera\")\n",
    "val keeper = hiveCxt.table(\"sqlzoo.keeper\")\n",
    "val vehicle = hiveCxt.table(\"sqlzoo.vehicle\")\n",
    "val image = hiveCxt.table(\"sqlzoo.image\")\n",
    "val permit = hiveCxt.table(\"sqlzoo.permit\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Sample query\n",
    "\n",
    "List the vehicles for which \"Strenuous, Sam\" is the registered keeper. The link between Keepers and Vehicles is via the foreign key specified in the CREATE TABLE vehicle statement. Note the line:\n",
    "\n",
    "```\n",
    " ,FOREIGN KEY(keeper) REFERENCES keeper(id)\n",
    "```\n",
    "\n",
    "This will be the basis of our join condition."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1.\n",
    "Show the name and address of the keeper of vehicle SO 02 PSP."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>name</th><th>address</th>\n",
       "                </tr>\n",
       "                <tr><td>Strenuous, Sam</td><td>Surjection Street</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(keeper.join(vehicle.filter(vehicle(\"id\") === \"SO 02 PSP\"),\n",
    "             (keeper(\"id\") === vehicle(\"keeper\")))\n",
    " .select(\"name\", \"address\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2.\n",
    "Show the number of cameras that take images for incoming vehicles."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>cnt</th>\n",
       "                </tr>\n",
       "                <tr><td>8</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(camera.filter(col(\"perim\") === \"IN\")\n",
    " .groupBy()\n",
    " .agg(count(\"id\").alias(\"cnt\"))\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3.\n",
    "List the image details taken by Camera 10 before 26 Feb 2007."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>camera</th><th>whn</th><th>reg</th><th>id</th><th>perim</th>\n",
       "                </tr>\n",
       "                <tr><td>1</td><td>2007-02-25 06:10:13.0</td><td>SO 02 ASP</td><td>1</td><td>IN</td></tr><tr><td>17</td><td>2007-02-25 06:20:01.0</td><td>SO 02 ASP</td><td>17</td><td>null</td></tr><tr><td>18</td><td>2007-02-25 06:23:40.0</td><td>SO 02 ASP</td><td>18</td><td>null</td></tr><tr><td>9</td><td>2007-02-25 06:26:04.0</td><td>SO 02 ASP</td><td>9</td><td>OUT</td></tr><tr><td>17</td><td>2007-02-25 06:57:31.0</td><td>SO 02 CSP</td><td>17</td><td>null</td></tr><tr><td>17</td><td>2007-02-25 07:00:40.0</td><td>SO 02 CSP</td><td>17</td><td>null</td></tr><tr><td>12</td><td>2007-02-25 07:04:31.0</td><td>SO 02 CSP</td><td>12</td><td>OUT</td></tr><tr><td>5</td><td>2007-02-25 07:10:00.0</td><td>SO 02 GSP</td><td>5</td><td>IN</td></tr><tr><td>16</td><td>2007-02-25 07:13:00.0</td><td>SO 02 GSP</td><td>16</td><td>OUT</td></tr><tr><td>2</td><td>2007-02-25 07:20:01.0</td><td>SO 02 TSP</td><td>2</td><td>IN</td></tr><tr><td>19</td><td>2007-02-25 07:23:00.0</td><td>SO 02 TSP</td><td>19</td><td>null</td></tr><tr><td>19</td><td>2007-02-25 07:26:31.0</td><td>SO 02 TSP</td><td>19</td><td>null</td></tr><tr><td>19</td><td>2007-02-25 07:29:00.0</td><td>SO 02 TSP</td><td>19</td><td>null</td></tr><tr><td>8</td><td>2007-02-25 07:35:41.0</td><td>SO 02 CSP</td><td>8</td><td>IN</td></tr><tr><td>18</td><td>2007-02-25 07:39:04.0</td><td>SO 02 CSP</td><td>18</td><td>null</td></tr><tr><td>18</td><td>2007-02-25 07:42:30.0</td><td>SO 02 CSP</td><td>18</td><td>null</td></tr><tr><td>10</td><td>2007-02-25 07:45:11.0</td><td>SO 02 CSP</td><td>10</td><td>OUT</td></tr><tr><td>8</td><td>2007-02-25 07:48:10.0</td><td>SO 02 CSP</td><td>8</td><td>IN</td></tr><tr><td>19</td><td>2007-02-25 07:51:10.0</td><td>SO 02 CSP</td><td>19</td><td>null</td></tr><tr><td>18</td><td>2007-02-25 07:55:11.0</td><td>SO 02 CSP</td><td>18</td><td>null</td></tr><tr><td>11</td><td>2007-02-25 07:58:01.0</td><td>SO 02 CSP</td><td>11</td><td>OUT</td></tr><tr><td>18</td><td>2007-02-25 16:28:40.0</td><td>SO 02 SSP</td><td>18</td><td>null</td></tr><tr><td>18</td><td>2007-02-25 16:29:11.0</td><td>SO 02 DSP</td><td>18</td><td>null</td></tr><tr><td>9</td><td>2007-02-25 16:31:01.0</td><td>SO 02 SSP</td><td>9</td><td>OUT</td></tr><tr><td>19</td><td>2007-02-25 16:31:01.0</td><td>SO 02 DSP</td><td>19</td><td>null</td></tr><tr><td>18</td><td>2007-02-25 16:38:31.0</td><td>SO 02 RSP</td><td>18</td><td>null</td></tr><tr><td>9</td><td>2007-02-25 16:39:10.0</td><td>SO 02 RSP</td><td>9</td><td>OUT</td></tr><tr><td>9</td><td>2007-02-25 16:45:04.0</td><td>SO 02 HSP</td><td>9</td><td>OUT</td></tr><tr><td>9</td><td>2007-02-25 16:48:11.0</td><td>SO 02 HSP</td><td>9</td><td>OUT</td></tr><tr><td>9</td><td>2007-02-25 16:51:30.0</td><td>SO 02 HSP</td><td>9</td><td>OUT</td></tr><tr><td>9</td><td>2007-02-25 16:58:01.0</td><td>SO 02 ISP</td><td>9</td><td>OUT</td></tr><tr><td>12</td><td>2007-02-25 17:01:13.0</td><td>SO 02 ISP</td><td>12</td><td>OUT</td></tr><tr><td>3</td><td>2007-02-25 17:07:00.0</td><td>SO 02 JSP</td><td>3</td><td>IN</td></tr><tr><td>18</td><td>2007-02-25 17:10:43.0</td><td>SO 02 JSP</td><td>18</td><td>null</td></tr><tr><td>19</td><td>2007-02-25 17:14:11.0</td><td>SO 02 JSP</td><td>19</td><td>null</td></tr><tr><td>3</td><td>2007-02-25 17:16:11.0</td><td>SO 02 ESP</td><td>3</td><td>IN</td></tr><tr><td>3</td><td>2007-02-25 17:17:03.0</td><td>SO 02 JSP</td><td>3</td><td>IN</td></tr><tr><td>19</td><td>2007-02-25 17:42:41.0</td><td>SO 02 DSP</td><td>19</td><td>null</td></tr><tr><td>11</td><td>2007-02-25 18:08:00.0</td><td>SO 02 FSP</td><td>11</td><td>OUT</td></tr><tr><td>12</td><td>2007-02-25 18:08:13.0</td><td>SO 02 GSP</td><td>12</td><td>OUT</td></tr><tr><td>10</td><td>2007-02-25 18:08:40.0</td><td>SO 02 ESP</td><td>10</td><td>OUT</td></tr><tr><td>10</td><td>2007-02-25 18:23:11.0</td><td>SO 02 MUP</td><td>10</td><td>OUT</td></tr><tr><td>11</td><td>2007-02-25 18:26:13.0</td><td>SO 02 NUP</td><td>11</td><td>OUT</td></tr><tr><td>12</td><td>2007-02-25 18:29:01.0</td><td>SO 02 OUP</td><td>12</td><td>OUT</td></tr><tr><td>3</td><td>2007-02-25 18:33:10.0</td><td>SO 02 PUP</td><td>3</td><td>IN</td></tr><tr><td>15</td><td>2007-02-25 18:36:31.0</td><td>SO 02 PUP</td><td>15</td><td>OUT</td></tr><tr><td>3</td><td>2007-02-25 18:39:10.0</td><td>SO 02 PUP</td><td>3</td><td>IN</td></tr><tr><td>9</td><td>2007-02-25 18:54:30.0</td><td>SO 02 DSP</td><td>9</td><td>OUT</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(image.filter(image(\"whn\") < \"2007-02-26\")\n",
    " .join(camera, (image(\"camera\") === camera(\"id\")))\n",
    " .orderBy(\"whn\", \"camera\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4.\n",
    "List the number of images taken by each camera. Your answer should show how many images have been taken by camera 1, camera 2 etc. The list must NOT include the images taken by camera 15, 16, 17, 18 and 19."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>camera</th><th>cnt</th>\n",
       "                </tr>\n",
       "                <tr><td>12</td><td>4</td></tr><tr><td>1</td><td>1</td></tr><tr><td>3</td><td>5</td></tr><tr><td>5</td><td>1</td></tr><tr><td>9</td><td>8</td></tr><tr><td>8</td><td>2</td></tr><tr><td>10</td><td>4</td></tr><tr><td>11</td><td>3</td></tr><tr><td>2</td><td>1</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(image.filter(! image(\"camera\").between(15, 19))\n",
    " .groupBy(\"camera\")\n",
    " .agg(count(\"reg\").alias(\"cnt\"))\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5.\n",
    "A number of vehicles have permits that start on 30th Jan 2007. List the name and address for each keeper in alphabetical order without duplication."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>name</th><th>address</th>\n",
       "                </tr>\n",
       "                <tr><td>Ambiguous, Arthur</td><td>Absorption Ave.</td></tr><tr><td>Assiduous, Annie</td><td>Attribution Alley</td></tr><tr><td>Contiguous, Carol</td><td>Circumscription Close</td></tr><tr><td>Strenuous, Sam</td><td>Surjection Street</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(keeper.withColumnRenamed(\"id\", \"keeper\")\n",
    " .join(vehicle, \"keeper\")\n",
    " .join(permit.filter(to_date(permit(\"sdate\")) === \"2007-01-30\"),\n",
    "       (vehicle(\"id\") === permit(\"reg\")))\n",
    " .select(\"name\", \"address\")\n",
    " .distinct()\n",
    " .orderBy(\"name\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Scala",
   "language": "scala",
   "name": "scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".sc",
   "mimetype": "text/x-scala",
   "name": "scala",
   "nbconvert_exporter": "script",
   "version": "2.12.15"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
